Flink 一次打包多个环境运行 您所在的位置:网站首页 flink 打包发布 Flink 一次打包多个环境运行

Flink 一次打包多个环境运行

2023-11-29 15:18| 来源: 网络整理| 查看: 265

前言

入坑大数据之前,一直在做业务开发,必然是少不了用springboot。springboot 可以根据 spring.profiles.active来指定启动的环境信息,一个包可以运用多个环境,也在一定程度上避免了线上和开发测试不一致的情况,那么在Flink 中该如何实现再加载环境时指定环境信息呢,是否有flink.profiles.active?

当然Flink 没有提供类似的功能,我们针对Flink 提供的工具方法稍加加工即可实现。

实现

针对于flink读取配置ParameterTool 提供了很多取配置的方法,使用env.getConfig().setGlobalJobParameters(parameterTool) 它可以传递 Configuration 中的参数到 Rich 函数中,使用open() 可以获取到配置中的参数。 我们可以从ParameterTool入手,我们可以根据启动指定的args[],来决定加载什么环境的配置,当然我们需要约定好,比如说启动时指定--flink.profiles.active test 来读取我们test对应的环境信息 , 利用 ParameterTool.fromArgs(args).get("flink.profiles.active")可以获取到 flink.profiles.active 值,如果没有值就读取默认位置的配置文件。如果有值我们可以采用路径区来分环境,读取不同环境下的配置信息,当然springboot 还有配置优先级,可以meger 默认配置文件中的配置,ParameterTool提供了ParameterTool#mergeWith()方法。具体代码实现如下:

/** * @author liweigao * @date 2021/7/28 下午6:13 */ @Slf4j public class EnvOperation { private EnvOperation() { throw new RuntimeException("not support instantiation by other!"); } private static final String ENV = "flink.profiles.active"; private static final String FILE_CHARACTER = "/"; /** * @param args 参数 * @param tclass class * @param name name of the desired resource * @return inputStream */ public static InputStream getEnvStream(String[] args, Class tclass, String name) { ParameterTool fromArgs = fromArgs(args); String path = getEnvPath(fromArgs); return getEnvStream(path, tclass, name); } /** * 解析文件 * * @param path 解析激活的环境变量 * @param tclass class * @param name 文件名称 * @return inputStream */ private static InputStream getEnvStream(String path, Class tclass, String name) { if (Strings.isBlank(path)) { log.info("profiles active is : default"); return getDefaultEnvStream(tclass, name); } else { if (!name.startsWith(FILE_CHARACTER)) { name = FILE_CHARACTER + name; } log.info("profiles active is : {}", path); return tclass.getResourceAsStream(FILE_CHARACTER + path + name); } } /** * 获取指定环境的配置信息 * * @param path 路径 * @param tclass class * @param name 文件名称 * @return inputStream */ private static InputStream getSpeciallyEnvStream(String path, Class tclass, String name) { if (!name.startsWith(FILE_CHARACTER)) { name = FILE_CHARACTER + name; } return tclass.getResourceAsStream(FILE_CHARACTER + path + name); } /** * @param tclass class * @param name 文件名称 * @return inputStream */ private static InputStream getDefaultEnvStream(Class tclass, String name) { return tclass.getResourceAsStream(name); } /** * @param args 参数 * @param name name of the desired resource * @return inputStream */ public static InputStream getEnvStream(String[] args, String name) { return getEnvStream(args, EnvOperation.class, name); } /** * 获取指定环境路径 * * @param tool args * @return path * @see {@link Utils#getKeyFromArgs(String[], int)} */ private static String getEnvPath(ParameterTool tool) { return tool.get(ENV); } private static ParameterTool fromArgs(String[] args) { return ParameterTool.fromArgs(args); } public static ParameterTool getParameterToolMergeDefault(String[] args, Class tclass, String name) throws IOException { ParameterTool fromArgs = fromArgs(args); String path = getEnvPath(fromArgs); ParameterTool defaultTool = getParameterToolForInputStream(getDefaultEnvStream(tclass, name)); if (Strings.isBlank(path)) { log.info("profiles active is : default"); log.info("default config mergeWith args config"); // formArgs data is UnmodifiableMap Map map = fromArgs.toMap(); map = Maps.newHashMap(map); map.put(ENV, "default"); fromArgs = ParameterTool.fromMap(map); return defaultTool.mergeWith(fromArgs); } else { log.info("profiles active is : {}", path); ParameterTool active = getParameterToolForInputStream(getSpeciallyEnvStream(path, tclass, name)); log.info("default config mergeWith {} config", path); active = defaultTool.mergeWith(active); log.info("{} config is mergeWith args config", path); return active.mergeWith(fromArgs); } } private static ParameterTool getParameterToolForInputStream(InputStream inputStream) throws IOException { ParameterTool parameterTool = ParameterTool.fromArgs(new String[0]); if (Objects.nonNull(inputStream)) { parameterTool = ParameterTool.fromPropertiesFile(inputStream); } else { log.debug("profiles path is non-existent"); } return parameterTool; } }

获取配置时:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parameterTool = EnvOperation.getParameterToolMergeDefault(args, Application.class, "/application.properties"); //设置全局配置 env.getConfig().setGlobalJobParameters(parameterTool);;

parameterTool 对象即为指定 flink.profiles.active 对应的环境信息。 配置目录结构

├── application.properties ├── test │ └── application.properties └── dev └── application.properties

当我们部署standalone 、yarn(session 、 per job or application moudle)时 启动命令需要加–flink.profiles.active [环境信息], 即可实现启动指定环境信息

总结

以上拙见,毕竟才入坑,欢迎交流,本次实现为不同环境按照目录来区分,当然我们也可以完全做到跟springboot 加载环境信息一样的功能,当然目前已经满足我们目前的需求了~点到为止,不做复杂的轮子。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有